Crate kafka_protocol

source ·
Expand description

Implementation of the Kafka wire protocol in Rust.

This library follows the approach of the Kafka project and generates the protocol through schema defined in JSON. This ensures not only compatibility, but easy synchronization when new features are added upstream.

The goal of this project is to provide 100% coverage of the Kafka API, as well as basic utilities for working with the protocol, including compression and record serialization.

Messages

The messages module contains the generated request and response structs. These structs allow easy serialization and deserialization via bytes::Bytes.

use bytes::{Bytes, BytesMut};
use kafka_protocol::messages::RequestHeader;
use kafka_protocol::protocol::{StrBytes, Encodable, Decodable};

let mut request_header = RequestHeader::default();
request_header.correlation_id = 1;
request_header.client_id = Some(StrBytes::from_str("test-client"));
let mut buf = BytesMut::new();
request_header.encode(&mut buf, 3);
assert_eq!(request_header, RequestHeader::decode(&mut buf, 3).unwrap());

Note that every message implementation of Encodable::encode and Decodable::decode requires a version to be provided explicitly. This is because every message contains all the fields that are valid for every version. These fields are not marked Option, which would represent nullability according to a message’s schema, but rather have a default value that represents a nil value. It is the user’s responsibility to ensure that only valid fields of the decoded message version are used.

Sending a Request

A request can be created by serializing a messages::RequestHeader and any given request type to a bytes::Bytes buffer.

use kafka_protocol::protocol::{StrBytes, Encodable};
use bytes::{BytesMut, Bytes};
use kafka_protocol::messages::{RequestHeader, ApiKey, ApiVersionsRequest};

let mut buf = BytesMut::new();
let mut req_header = RequestHeader::default();
req_header.request_api_version = 3;
req_header.request_api_key = ApiKey::ApiVersionsKey as i16;
req_header.client_id = Some(StrBytes::from_str("example"));
req_header.encode(&mut buf, 2).unwrap();
let mut api_versions_req = ApiVersionsRequest::default();
api_versions_req.client_software_version = StrBytes::from_str("1.0");
api_versions_req.client_software_name = StrBytes::from_str("example-client");
api_versions_req.encode(&mut buf, 3);


// implemented elsewhere
send_request(&buf[..]);

Deserializing an Unknown Request

The messages module provides the enums messages::RequestKind, messages::ResponseKind, and messages::ApiKey for matching on unknown requests and responses.

A simple example for decoding an unknown message encoded in buf:

use kafka_protocol::messages::{RequestHeader, ApiVersionsRequest, ApiKey, RequestKind};
use kafka_protocol::protocol::{Encodable, Decodable, StrBytes};
use bytes::{BytesMut, Buf};
use std::convert::TryFrom;
use kafka_protocol::protocol::buf::ByteBuf;

// version is the second field in request header
let version = buf.peek_bytes(2..4).get_i16();
let header = RequestHeader::decode(&mut buf, version).unwrap();
let api_key = ApiKey::try_from(header.request_api_version);
let req = match api_key {
    ApiVersionsKey => RequestKind::ApiVersionsRequest(ApiVersionsRequest::decode(&mut buf, header.request_api_version).unwrap()),
};

// match on enum elsewhere and do work
match req {
    RequestKind::ApiVersionsRequest(req) => {
        assert_eq!(req.client_software_name.to_string(), "example-client".to_string());
    }
    _ => panic!()
}

Re-exports

pub use error::ResponseError;

Modules

Provides compression utilities for encoding records.
Error kinds for Kafka error codes.
Messages used by the Kafka protocol.
Raw types and utilities for use with the Kafka protocol.
Provides utilities for working with records (Kafka messages).